Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up streamed-proto query output by distributing work to multiple threads #24305

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

keithl-stripe
Copy link
Contributor

@keithl-stripe keithl-stripe commented Nov 12, 2024

This is a proposed fix for #24304

This speeds up a fully warm bazel query ... by 54%, reducing wall time from 1m49s to 50s

Current state:

$ time bazel query '...' --output=streamed_proto > queryoutput4.streamedproto

real    1m48.768s
user    0m27.410s
sys     0m19.646s

This PR:

$ time bazel query '...' --output=streamed_proto > queryoutput5.streamedproto

real    0m49.938s
user    0m22.897s
sys     0m16.161s

💁‍♂️ Note: when combined with #24298, total wall time is 37s, an overall reduction of 66%.

@github-actions github-actions bot added team-Performance Issues for Performance teams awaiting-review PR is awaiting review from an assigned reviewer labels Nov 12, 2024
try {
bout.writeTo(out);
} catch (IOException e) {
throw new RuntimeException(e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new RuntimeException(e);
throw new WrappedIOException(e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!! fixed.

}

@Override
public synchronized InterruptedException getCause() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be synchronized? Same for the other wrapper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks; good catch!


private static ByteArrayOutputStream writeDelimited(Build.Target targetProtoBuffer) {
try {
var bout = new ByteArrayOutputStream(targetProtoBuffer.getSerializedSize() + 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a comment on why 10?

@jin
Copy link
Member

jin commented Nov 13, 2024

Sorry, one more concern with H/T from @michajlo. Can you please make sure that the non-determinism from the parallel iterations doesn't break builds with --order_output=<auto|deps|full>, otherwise enable this optimization only if --order_output=no?

There are relevant tests in https://cs.opensource.google/bazel/bazel/+/master:src/test/java/com/google/devtools/build/lib/buildtool/QueryIntegrationTest.java to validate this, or add more if the tests aren't sufficient to cover the new #processOutput logic.

cc @zhengwei143 too, who worked on query output ordering before.

@jin
Copy link
Member

jin commented Nov 13, 2024

@michaeledgar too.

@keithl-stripe
Copy link
Contributor Author

keithl-stripe commented Nov 13, 2024

Sorry, one more concern with H/T from @michajlo. Can you please make sure that the non-determinism from the parallel iterations doesn't break builds with --order_output=<auto|deps|full>, otherwise enable this optimization only if --order_output=no?

Good point. I updated the code to use forEachOrdered :) I don't see any tests for ordering streamed protos, but forEachOrdered is specified well enough that I don't think I need to add any new tests right now.

try {
var bout =
new ByteArrayOutputStream(
targetProtoBuffer.getSerializedSize() + MAX_BYTES_FOR_VARINT32_ENCODING);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than always having a 10 byte margin, you can use CodedOutputStream.computeUInt32SizeNoTag to compute the exact size of the varint.

However, maybe you should just bypass going through the overhead of ByteArrayOutputStream and simply write to a byte array? Something like this?

var serializedSize = targetProtoBuffer.getSerializedSize();
var headerSize = CodedOutputStream.computeUInt32SizeNoTag(serializedSize);
var output = new byte[headerSize + serializedSize];
targetProtoBuffer.writeTo(CodedOutputStream.newInstance(output, headerSize, output.length - headerSize));
return output;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am up for this, but I'd argue this makes the code harder to read and maintain, and may not have any noticeable performance benefit. It also might have subtle bugs and could require more rigorous testing compared with the PR in its current form.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing the length-delimiting here, you can just serialize to byte array (targetProtoBuffer.toByteArray()), wrap the query output stream with CodedOutputStream, and then instead of writing the bytes directly to out, do codedOut.writeByteArrayNoTag(serializedBytes), which should be equivalent to writing length-delimited protos. So putting it all together, roughly...

OutputCallback ... {
   private final CodedOutputStream codedOut = CodedOutputStream(out, MAYBE_BUFFER_SIZE);

   ...
   ... processOutput(... targets) {
      Streams...(targets)
          .map(t -> toProto(t).toByteArray())
          // synchronized...
          .map(b -> codedOut.writeByteArrayNoTag(b));
   }

   ... close(...) {
     // codedOut is buffered, so make sure it gets flushed. note that you'll now need to deal
     // with this possibly throwing (eg because the underlying output stream was closed early)
     codedOut.flush();
   }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with your first suggestion, so I didn't need a big comment explaining why writing a byte array without a tag was equivalent to writeDelimitedTo :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it crashes when run on our 700k-target workspace. This highlights a bigger problem with my approach: there is a risk of building up a huge backlog of objects to be written and OOMing like this.

I know other parts of the codebase use RxJava - would it be acceptable to do so here? Otherwise I could try to come up with some kind of throttling or batching system.


FATAL: bazel ran out of memory and crashed. Printing stack trace:
java.lang.OutOfMemoryError
        at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(Unknown Source)
        at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Unknown Source)
        at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(Unknown Source)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(Unknown Source)
        at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
        at java.base/java.util.stream.ReferencePipeline.forEachOrdered(Unknown Source)
        at com.google.devtools.build.lib.query2.query.output.StreamedProtoOutputFormatter$1.processOutput(StreamedProtoOutputFormatter.java:54)
        at com.google.devtools.build.lib.query2.engine.OutputFormatterCallback.process(OutputFormatterCallback.java:54)
        at com.google.devtools.build.lib.query2.engine.OutputFormatterCallback.processAllTargets(OutputFormatterCallback.java:81)
        at com.google.devtools.build.lib.query2.query.output.QueryOutputUtils.output(QueryOutputUtils.java:75)
        at com.google.devtools.build.lib.runtime.commands.QueryCommand.doQuery(QueryCommand.java:180)
        at com.google.devtools.build.lib.runtime.commands.QueryEnvironmentBasedCommand.execInternal(QueryEnvironmentBasedCommand.java:186)
        at com.google.devtools.build.lib.runtime.commands.QueryEnvironmentBasedCommand.exec(QueryEnvironmentBasedCommand.java:89)
        at com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.execExclusively(BlazeCommandDispatcher.java:664)
        at com.google.devtools.build.lib.runtime.BlazeCommandDispatcher.exec(BlazeCommandDispatcher.java:244)
        at com.google.devtools.build.lib.server.GrpcServerImpl.executeCommand(GrpcServerImpl.java:573)
        at com.google.devtools.build.lib.server.GrpcServerImpl.lambda$run$1(GrpcServerImpl.java:641)
        at io.grpc.Context$1.run(Context.java:566)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.base/java.lang.StringConcatHelper.newString(Unknown Source)
        at java.base/java.lang.StringConcatHelper.simpleConcat(Unknown Source)
        at java.base/java.lang.invoke.DirectMethodHandle$Holder.invokeStatic(DirectMethodHandle$Holder)
        at java.base/java.lang.invoke.LambdaForm$MH/0x000000080008c000.invoke(LambdaForm$MH)
        at java.base/java.lang.invoke.Invokers$Holder.linkToTargetMethod(Invokers$Holder)
        at com.google.devtools.build.lib.cmdline.RepositoryName.getDisplayForm(RepositoryName.java:317)
        at com.google.devtools.build.lib.cmdline.PackageIdentifier.getDisplayForm(PackageIdentifier.java:230)
        at com.google.devtools.build.lib.cmdline.Label.getDisplayForm(Label.java:445)
        at com.google.devtools.build.lib.packages.LabelPrinter$2.toString(LabelPrinter.java:66)
        at com.google.devtools.build.lib.query2.query.output.ProtoOutputFormatter.lambda$toTargetProtoBuffer$2(ProtoOutputFormatter.java:248)
        at com.google.devtools.build.lib.query2.query.output.ProtoOutputFormatter$$Lambda/0x000000080066a278.accept(Unknown Source)
        at com.google.common.collect.ImmutableList.forEach(ImmutableList.java:422)
        at com.google.common.collect.RegularImmutableSortedSet.forEach(RegularImmutableSortedSet.java:89)
        at com.google.devtools.build.lib.query2.query.output.ProtoOutputFormatter.toTargetProtoBuffer(ProtoOutputFormatter.java:248)
        at com.google.devtools.build.lib.query2.query.output.ProtoOutputFormatter.toTargetProtoBuffer(ProtoOutputFormatter.java:173)
        at com.google.devtools.build.lib.query2.query.output.StreamedProtoOutputFormatter$1.toProto(StreamedProtoOutputFormatter.java:64)
        at com.google.devtools.build.lib.query2.query.output.StreamedProtoOutputFormatter$1$$Lambda/0x00000008006631a0.apply(Unknown Source)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
        at com.google.common.collect.CollectSpliterators$1WithCharacteristics.lambda$forEachRemaining$1(CollectSpliterators.java:72)
        at com.google.common.collect.CollectSpliterators$1WithCharacteristics$$Lambda/0x0000000800663860.accept(Unknown Source)
        at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Unknown Source)
        at com.google.common.collect.CollectSpliterators$1WithCharacteristics.forEachRemaining(CollectSpliterators.java:72)
        at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
        at java.base/java.util.stream.ForEachOps$ForEachOrderedTask.doCompute(Unknown Source)
        at java.base/java.util.stream.ForEachOps$ForEachOrderedTask.compute(Unknown Source)
        at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a BlockingQueue? You can give it a large but limited buffer size and if the consumer side falls behind, producers will block instead of racing towards an OOM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my experience with the internal code I referenced elsewhere, I think some batching would go a long way - right now we're encountering a lot of overhead by having a separate task for each record + calling to write each record to the stream individually.

Putting it all together, I think the simplest case is to (1) parallelize the formatting and produce batches of byte[]s, then (2) write those to the wire. Next optimization would be pipelining (1) and (2), such that batches of bytes are put on the wire close to when they're produced so they don't stick in memory too long. Then the next optimization would be to make this all async such that (1) and (2) are happening continuously in parallel with query processing... This however gets pretty complex, so I'd be interested to see how the implementation here shakes out.

WRT a bounded BlockingQueue - we had something like that which I wound up removing due to it not pulling its weight for the added complexity, but we also have a lot more control over how fast bytes were moved in the situation where it was used, so YMMV.

If we could avoid RxJava here for now I think that would be preferred.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't reviewed this pull request in detail, but I nevertheless do have an opinion about RxJava, which is "don't".

We added that dependency before virtual threads were available and it did not live up to our expectations and now we'd much like to cut that dependency if we could. I do realize RxJava does way more than virtual threads, but it's also a whole lot of new concepts to grasp for marginal benefit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback everyone! I updated the PR with a solution which uses ~minimal memory and is just as performant as my original PR. I don't think it's too complex, but you can be the judge of that :)

.map(StreamedProtoOutputFormatter::writeDelimited)
// I imagine forEachOrdered hurts performance somewhat in some cases. While we may
// not need to actually produce output in order, this code does not know whether
// ordering was requested. So we just always write it in order, and hope performance
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it actually does know, since we have access to options :) options.orderOutput == OrderOutput.NO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but there are a few options that together can influence output order AFAICT

for (Target target : partialResult) {
toTargetProtoBuffer(target, labelPrinter).writeDelimitedTo(out);
try {
StreamSupport.stream(partialResult.spliterator(), /* parallel= */ true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with parallel streams, but I think if this is interrupted then only one of the parallel items sees it and exits, while the rest will carry on. I don't think we want this since it will leave lingering threads doing formatting and writing to the output possibly beyond the output being closed or the command itself having exited. I think encountering an IOException has a similar issue - every thread will keep going and hitting the io exception even after this has exited.

We have a similar parallel formatting implementation for some internal code1 - IIRC we use close as a synchronization point to make sure that nothing is left behind from processOutput, as well as to reconcile any concurrent or cascading exceptions. What you're trying to do might be different enough that you can avoid this, but I fear you also might not get so lucky.

Footnotes

  1. A little too tightly coupled with some internal-only code to easily open source, coming from having tried a while ago

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, worth investigating. I will test ctrl+c'ing out of it. If needed I will make a little test case to determine runtime behavior of interrupts on parallel streams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ctrl+c does not respond quickly! I will need to find a better solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: ctrl+c responds immediately with the current ForkJoinPool-based implementation!

try {
var bout =
new ByteArrayOutputStream(
targetProtoBuffer.getSerializedSize() + MAX_BYTES_FOR_VARINT32_ENCODING);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing the length-delimiting here, you can just serialize to byte array (targetProtoBuffer.toByteArray()), wrap the query output stream with CodedOutputStream, and then instead of writing the bytes directly to out, do codedOut.writeByteArrayNoTag(serializedBytes), which should be equivalent to writing length-delimited protos. So putting it all together, roughly...

OutputCallback ... {
   private final CodedOutputStream codedOut = CodedOutputStream(out, MAYBE_BUFFER_SIZE);

   ...
   ... processOutput(... targets) {
      Streams...(targets)
          .map(t -> toProto(t).toByteArray())
          // synchronized...
          .map(b -> codedOut.writeByteArrayNoTag(b));
   }

   ... close(...) {
     // codedOut is buffered, so make sure it gets flushed. note that you'll now need to deal
     // with this possibly throwing (eg because the underlying output stream was closed early)
     codedOut.flush();
   }
}

@@ -34,13 +42,107 @@ public String getName() {
public OutputFormatterCallback<Target> createPostFactoStreamCallback(
final OutputStream out, final QueryOptions options, LabelPrinter labelPrinter) {
return new OutputFormatterCallback<Target>() {
private static final int MAX_CHUNKS_IN_QUEUE = Runtime.getRuntime().availableProcessors() * 2;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used ×2 to be safe, but I believe this actually just needs to be Runtime.getRuntime().availableProcessors(). Basically we just need to know that, each time the consumer pulls a chunk of byte arrays, some CPU is working on producing one to fill that spot.

@zhengwei143 zhengwei143 self-requested a review November 15, 2024 14:24
}
}
};
}

private static byte[] writeDelimited(Build.Target targetProtoBuffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a significant performance benefit to converting them to byte[] instead of just leaving them as Build.Target protos for the consumer to write?

If most of the benefit we gain comes from parallelizing toTargetProtoBuffer(), then perhaps we could reduce the complexity here and just deal with writing protos delimited to the output stream?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting precedent: https://cs.opensource.google/bazel/bazel/+/master:src/main/java/com/google/devtools/build/lib/runtime/ExecutionGraphModule.java;l=638. The byte representation probably takes up less memory while in the queue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy to leave it as is since this probably quite memory intensive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(do leave a comment here with regards to that)

copybara-service bot pushed a commit that referenced this pull request Nov 27, 2024
…rectly to a file

This is a proposed fix for #24293

This speeds up a fully warm `bazel query ...` by 23.7%, reducing wall time from 1m49s to 1m23s

```
$ time bazel query '...' --output=streamed_proto > queryoutput4.streamedproto

real    1m48.768s
user    0m27.410s
sys     0m19.646s

$ time bazel query '...' --output=streamed_proto --output_file=queryoutput5.streamedproto

real    1m22.920s
user    0m0.045s
sys     0m0.016s
```

_💁‍♂️ Note: when combined with #24305, total wall time is 37s, an overall reduction of 66%._

Closes #24298.

PiperOrigin-RevId: 700583890
Change-Id: Ic13f0611aca60c2ce8641e72a0fcfc330f13c803
iancha1992 pushed a commit to iancha1992/bazel that referenced this pull request Dec 2, 2024
…rectly to a file

This is a proposed fix for bazelbuild#24293

This speeds up a fully warm `bazel query ...` by 23.7%, reducing wall time from 1m49s to 1m23s

```
$ time bazel query '...' --output=streamed_proto > queryoutput4.streamedproto

real    1m48.768s
user    0m27.410s
sys     0m19.646s

$ time bazel query '...' --output=streamed_proto --output_file=queryoutput5.streamedproto

real    1m22.920s
user    0m0.045s
sys     0m0.016s
```

_💁‍♂️ Note: when combined with bazelbuild#24305, total wall time is 37s, an overall reduction of 66%._

Closes bazelbuild#24298.

PiperOrigin-RevId: 700583890
Change-Id: Ic13f0611aca60c2ce8641e72a0fcfc330f13c803
iancha1992 pushed a commit to iancha1992/bazel that referenced this pull request Dec 12, 2024
…rectly to a file

This is a proposed fix for bazelbuild#24293

This speeds up a fully warm `bazel query ...` by 23.7%, reducing wall time from 1m49s to 1m23s

```
$ time bazel query '...' --output=streamed_proto > queryoutput4.streamedproto

real    1m48.768s
user    0m27.410s
sys     0m19.646s

$ time bazel query '...' --output=streamed_proto --output_file=queryoutput5.streamedproto

real    1m22.920s
user    0m0.045s
sys     0m0.016s
```

_💁‍♂️ Note: when combined with bazelbuild#24305, total wall time is 37s, an overall reduction of 66%._

Closes bazelbuild#24298.

PiperOrigin-RevId: 700583890
Change-Id: Ic13f0611aca60c2ce8641e72a0fcfc330f13c803
meteorcloudy added a commit that referenced this pull request Dec 19, 2024
…sults di… (#24667)

…rectly to a file

This is a proposed fix for
#24293

This speeds up a fully warm `bazel query ...` by 23.7%, reducing wall
time from 1m49s to 1m23s

```
$ time bazel query '...' --output=streamed_proto > queryoutput4.streamedproto

real    1m48.768s
user    0m27.410s
sys     0m19.646s

$ time bazel query '...' --output=streamed_proto --output_file=queryoutput5.streamedproto

real    1m22.920s
user    0m0.045s
sys     0m0.016s
```

_💁‍♂️ Note: when combined with
#24305, total wall time is 37s,
an overall reduction of 66%._

Closes #24298.

PiperOrigin-RevId: 700583890
Change-Id: Ic13f0611aca60c2ce8641e72a0fcfc330f13c803

Commit
791e1f7

Co-authored-by: Keith Lea <[email protected]>
Co-authored-by: Yun Peng <[email protected]>
@keithl-stripe
Copy link
Contributor Author

Hi @michajlo! Happy new year! I would love if you could take another look at this PR, as I think I addressed all of your concerns. There is a TODO I need to deal with, and I feel that I should add a test (perhaps here) but the logic itself is ready for your review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awaiting-review PR is awaiting review from an assigned reviewer team-Performance Issues for Performance teams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants